package gu.simplemq.redis;

import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import gu.simplemq.IAdvisor;
import gu.simplemq.pool.BaseMQPool;
import redis.clients.jedis.Jedis;

public class RedisConsumerAdvisor implements IAdvisor,RedisConstants {
	private static final LoadingCache<BaseMQPool<Jedis>,RedisConsumerAdvisor> SUB_CACHE = 
			CacheBuilder.newBuilder().build(new CacheLoader<BaseMQPool<Jedis>, RedisConsumerAdvisor>(){

		@Override
		public RedisConsumerAdvisor load(BaseMQPool<Jedis> key) throws Exception {
			return new RedisConsumerAdvisor(key,SUBSCRIBER_SET_SUFFIX);
		}});
	private static final LoadingCache<BaseMQPool<Jedis>,RedisConsumerAdvisor> QUEUE_CACHE = 
			CacheBuilder.newBuilder().build(new CacheLoader<BaseMQPool<Jedis>, RedisConsumerAdvisor>(){

		@Override
		public RedisConsumerAdvisor load(BaseMQPool<Jedis> key) throws Exception {
			return new RedisConsumerAdvisor(key,CONSUMER_SET_SUFFIX);
		}});
	private final JedisPoolLazy poolLazy;
	private final String keySuffix;
	private RedisConsumerAdvisor(BaseMQPool<Jedis> poolLazy, String keySuffix) {
		this.poolLazy = (JedisPoolLazy) poolLazy;
		this.keySuffix = keySuffix;
	}

	@Override
	public int consumerCountOf(String channelName) {
		if(Strings.isNullOrEmpty(channelName)){
			return 0;
		}
		return (int) JedisUtils.scard(poolLazy,channelName + keySuffix);
	}

	public int subscriberCountOf(String channelName) {
		return consumerCountOf(channelName);
	}

	public static RedisConsumerAdvisor topicAdvisorOf(BaseMQPool<Jedis> pool){
		return SUB_CACHE.getUnchecked(pool);
	}
	public static RedisConsumerAdvisor queueAdvisorOf(BaseMQPool<Jedis> pool){
		return QUEUE_CACHE.getUnchecked(pool);
	}
}
